package com.zyx.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.zyx.constants.Constants;
import com.zyx.constants.ChannelMap;
import com.zyx.dto.ResponseDto;
import com.zyx.netty.codec.DynamicHttpCodec;
import com.zyx.resource.DynProperties;
import com.zyx.utils.DynamicMsgUtils;
import com.zyx.vo.NettyHttpRequest;
import com.zyx.vo.NettyHttpResponse;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.net.InetAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static com.zyx.constants.Constants.*;
import static com.zyx.constants.NettyConstants.CURRENT_REQ_BOUND_WITH_THE_CHANNEL;
import static com.zyx.constants.NettyConstants.NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP;
import static com.zyx.constants.RetCode.RETCODE_9001;
import static com.zyx.constants.RetCode.RETCODE_OK;

/**
 * 描述说明:
 *
 * @author DQing
 * @version 1.0
 * @className AbstractMsgService
 * @date 2020/4/2 17:49
 */
@Slf4j
@Component
public abstract class AbstractMsgService {

    @Autowired
    DynProperties dynProperties;
    private final Long TIME_OUT = 30L;

    /**
     * 响应业务平台消息
     * @param retCode
     * @param rpid
     */
    protected String resBusiMessage(String retCode,String rpid){
        Map<String,String> map = new HashMap<>();
        map.put(INNER_RPID,rpid);
        map.put(INNER_RES_RETCODE,retCode);
        map.put(INNER_RES_RETMSG, dynProperties.get(retCode));
        String resData = JSON.toJSONString(map);
        log.info("响应业务平台报文消息：{}",resData);
        return resData;
    }

    /**
     * 获取远程设备响应报文
     * @param channel    请求远程设备channel通道
     * @param msg        请求远程的报文
     * @param uri        请求远程的URI
     * @param methodType 请求远程的类型
     * @return
     */
    protected NettyHttpResponse getRemotingResponse(Channel channel,
                                                    String msg,
                                                    String uri,
                                                    String methodType){
        FullHttpRequest httpRequest = requestMsg(msg,uri,methodType);
        Promise<NettyHttpResponse> defaultPromise = NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP.newPromise();
        NettyHttpRequest context = new NettyHttpRequest(httpRequest, defaultPromise);
        channel.attr(CURRENT_REQ_BOUND_WITH_THE_CHANNEL).set(context);

        DynamicHttpCodec.responseHandler(channel.pipeline(),true);
        ChannelFuture channelFuture = channel.writeAndFlush(httpRequest);
        channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                log.info(Thread.currentThread().getName() + " 请求发送完成");
            }
        });
        return get(defaultPromise);
    }

    /**
     * 封装请求远程的报文
     * @param msg      请求报文
     * @param uri      请求URI
     * @param methodType 请求报文类型
     */
    private FullHttpRequest requestMsg(String msg, String uri, String methodType){
        log.info("# 封装请求外部报文");
        FullHttpRequest request;
        HttpMethod method = new HttpMethod(methodType);
        if(StringUtils.isEmpty(msg)){
            request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method,uri);
        }else{
            request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method,
                    uri, Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
            request.headers().
                    set(HttpHeaderNames.CONTENT_LENGTH, msg.getBytes(CharsetUtil.UTF_8).length);
        }
        request.headers().
                set(HttpHeaderNames.CACHE_CONTROL,HttpHeaderValues.NO_CACHE).
                set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON).
                set(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE).
                set(HttpHeaderNames.ACCEPT,"*/*").
                set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP_DEFLATE);
        String hostAddress = "";
        try{
            InetAddress address = InetAddress.getLocalHost();
            hostAddress = address.getHostAddress();
        }catch (Exception e){
            log.error(" 获取当前服务器Host失败:{}",e.getMessage());
        }
        if(!StringUtils.isEmpty(hostAddress)){
            request.headers().
                    set(HttpHeaderNames.HOST, hostAddress);
        }
        log.info("请求地址uri={}, 请求报文头header={}, 请求报文内容context={}",uri,request.headers(),request.content().toString(CharsetUtil.UTF_8));
        return request;
    }


    protected <V> V get(Promise<V> future) {
        return get(future,TIME_OUT);
    }

    protected <V> V get(Promise<V> future,long timeout) {
        if (!future.isDone()) {
            CountDownLatch l = new CountDownLatch(1);
            future.addListener(new GenericFutureListener<Future<? super V>>() {
                @Override
                public void operationComplete(Future<? super V> future) throws Exception {
                    log.info("received response,listener is invoked");
                    if (future.isDone()) {
                        // io线程会回调该listener
                        l.countDown();
                    }
                }
            });
            boolean interrupted = false;
            if (!future.isDone()) {
                try {
                    l.await(30, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    log.error("e:{}", e);
                    interrupted = true;
                }

            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
        if (future.isSuccess()) {
            return future.getNow();
        }
        log.error("wait result time out");
        return null;
    }

    protected String parseDeviceMsg(NettyHttpResponse nettyHttpResponse,String rpid){
        String message = null;
        if(nettyHttpResponse == null){
            log.error("接收设备响应报文超时");
            message = resBusiMessage(RETCODE_9001,rpid);
        }else {
            ResponseDto responseDto = JSONObject.parseObject(nettyHttpResponse.getBody(),new TypeReference<ResponseDto>(){});
            String retCode = RETCODE_OK;
            if(responseDto.getCode()!=0){
                retCode = String.format("%04d",responseDto.getCode());
            }
            Map<String, Object> map = new HashMap<>();
            map.put(INNER_RES_RETCODE, retCode);
            map.put(INNER_RES_RETMSG, dynProperties.get(retCode));
            map.put(INNER_RPID, rpid);
            message = JSON.toJSONString(map);
        }
        return message;
    }
}
